// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package pbft

import (
	"errors"
	"net"
	"strings"

	pb "github.com/33cn/chain33/types"
	"github.com/golang/protobuf/proto"
)

// constant
const (
	CheckPointPeriod uint32 = 128
	ConstantFactor   uint32 = 2
)

// Replica struct
type Replica struct {
	ID          uint32
	replicas    map[uint32]string
	activeView  bool
	view        uint32
	sequence    uint32
	requestChan chan *pb.Request
	replyChan   chan *pb.ClientReply
	errChan     chan error
	doneChan    chan string
	requests    map[string][]*pb.Request
	replies     map[string][]*pb.ClientReply
	lastReply   *pb.ClientReply
	pendingVC   []*pb.Request
	executed    []uint32
	checkpoints []*pb.Checkpoint
}

// NewReplica create Replica instance
func NewReplica(id uint32, PeersURL string, addr string) (chan *pb.ClientReply, chan *pb.Request, bool) {
	replyChan := make(chan *pb.ClientReply)
	requestChan := make(chan *pb.Request)
	pn := &Replica{
		ID:          id,
		replicas:    make(map[uint32]string),
		activeView:  true,
		view:        1,
		sequence:    0,
		requestChan: requestChan,
		replyChan:   replyChan,
		errChan:     make(chan error),
		requests:    make(map[string][]*pb.Request),
		replies:     make(map[string][]*pb.ClientReply),
		doneChan:    make(chan string),
		lastReply:   nil,
		pendingVC:   make([]*pb.Request, 10),
		executed:    make([]uint32, 10),
	}
	peers := strings.Split(PeersURL, ",")
	for num, peer := range peers {
		pn.replicas[uint32(num)] = peer
	}
	pn.checkpoints = []*pb.Checkpoint{ToCheckpoint(0, []byte(""))}
	pn.Startnode(addr)
	return replyChan, requestChan, pn.isPrimary(pn.ID)

}

// Startnode method
func (rep *Replica) Startnode(addr string) {
	rep.acceptConnections(addr)
}

// Basic operations

func (rep *Replica) primary() uint32 {
	return rep.view % uint32(len(rep.replicas)+1)
}

func (rep *Replica) newPrimary(view uint32) uint32 {
	return view % uint32(len(rep.replicas)+1)
}

func (rep *Replica) isPrimary(ID uint32) bool {
	return ID == rep.primary()
}

func (rep *Replica) oneThird(count int) bool {
	return count >= (len(rep.replicas)+1)/3
}

func (rep *Replica) overOneThird(count int) bool {
	return count > (len(rep.replicas)+1)/3
}

func (rep *Replica) twoThirds(count int) bool {
	return count >= 2*(len(rep.replicas)-1)/3
}

func (rep *Replica) overTwoThirds(count int) bool {
	return count > 2*(len(rep.replicas)-1)/3
}

func (rep *Replica) lowWaterMark() uint32 {
	return rep.lastStable().Sequence
}

func (rep *Replica) highWaterMark() uint32 {
	return rep.lowWaterMark() + CheckPointPeriod*ConstantFactor
}

func (rep *Replica) sequenceInRange(sequence uint32) bool {
	return sequence > rep.lowWaterMark() && sequence <= rep.highWaterMark()
}

func (rep *Replica) lastExecuted() uint32 {
	return rep.executed[len(rep.executed)-1]
}

func (rep *Replica) lastStable() *pb.Checkpoint {
	return rep.checkpoints[len(rep.checkpoints)-1]
}

func (rep *Replica) theLastReply() *pb.ClientReply {
	lastReply := rep.lastReply
	for _, replies := range rep.replies {
		reply := replies[len(replies)-1]
		if reply.Timestamp > lastReply.Timestamp {
			lastReply = reply
		}
	}
	return lastReply
}

func (rep *Replica) lastReplyToClient(client string) *pb.ClientReply {
	if v, ok := rep.replies[client]; ok {
		return v[len(rep.replies[client])-1]
	}
	return nil
}

func (rep *Replica) stateDigest() []byte {
	return RepDigest(rep.theLastReply())
}

func (rep *Replica) isCheckpoint(sequence uint32) bool {
	return sequence%CheckPointPeriod == 0
}

func (rep *Replica) addCheckpoint(checkpoint *pb.Checkpoint) {
	rep.checkpoints = append(rep.checkpoints, checkpoint)
}

func (rep *Replica) acceptConnections(addr string) {
	ln, err := net.Listen("tcp", addr)
	if err != nil {
		plog.Error("tcp connect error", "err", err)
	}
	go rep.sendRoutine()
	go func() {
		for {
			conn, err := ln.Accept()
			if err != nil {
				plog.Error("Accept error")
			}
			req := &pb.Request{}
			err = ReadMessage(conn, req)
			if err != nil {
				plog.Error("readmessage error", "err", err)
			}
			rep.handleRequest(req)
		}
	}()
}

// Sends

func (rep *Replica) multicast(REQ proto.Message) error {
	for _, replica := range rep.replicas {
		err := WriteMessage(replica, REQ)
		if err != nil {
			return err
		}
	}
	return nil
}

func (rep *Replica) sendRoutine() {
	for REQ := range rep.requestChan {
		switch REQ.Value.(type) {
		case *pb.Request_Ack:
			view := REQ.GetAck().View
			primaryID := rep.newPrimary(view)
			primary := rep.replicas[primaryID]
			if primary == "" {
				plog.Error("primary not exeist")
				continue
			}
			err := WriteMessage(primary, REQ)
			if err != nil {
				go func() {
					rep.errChan <- err
				}()
			}
		default:
			err := rep.multicast(REQ)

			if err != nil {
				go func() {
					rep.errChan <- err
				}()
			}
		}
	}
}

func (rep *Replica) allRequests() []*pb.Request {
	var requests []*pb.Request
	for _, reqs := range rep.requests {
		requests = append(requests, reqs...)
	}
	return requests
}

// Log
// !hasRequest before append?

func (rep *Replica) logRequest(REQ *pb.Request) {
	switch REQ.Value.(type) {
	case *pb.Request_Client:
		rep.requests["client"] = append(rep.requests["client"], REQ)
	case *pb.Request_Preprepare:
		rep.requests["pre-prepare"] = append(rep.requests["pre-prepare"], REQ)
	case *pb.Request_Prepare:
		rep.requests["prepare"] = append(rep.requests["prepare"], REQ)
	case *pb.Request_Commit:
		rep.requests["commit"] = append(rep.requests["commit"], REQ)
	case *pb.Request_Checkpoint:
		rep.requests["checkpoint"] = append(rep.requests["checkpoint"], REQ)
	case *pb.Request_Viewchange:
		rep.requests["view-change"] = append(rep.requests["view-change"], REQ)
	case *pb.Request_Ack:
		rep.requests["ack"] = append(rep.requests["ack"], REQ)
	case *pb.Request_Newview:
		rep.requests["new-view"] = append(rep.requests["new-view"], REQ)
	default:
		plog.Info("Replica %d tried logging unrecognized request type\n", rep.ID)
	}
}

func (rep *Replica) logPendingVC(REQ *pb.Request) error {
	switch REQ.Value.(type) {
	case *pb.Request_Viewchange:
		rep.pendingVC = append(rep.pendingVC, REQ)
		return nil
	default:
		return errors.New("Request is wrong type")
	}
}

func (rep *Replica) logReply(client string, reply *pb.ClientReply) {
	lastReplyToClient := rep.lastReplyToClient(client)
	if lastReplyToClient == nil || lastReplyToClient.Timestamp < reply.Timestamp {
		rep.replies[client] = append(rep.replies[client], reply)
	}
}

// Has requests

func (rep *Replica) hasRequest(REQ *pb.Request) bool {
	switch REQ.Value.(type) {
	case *pb.Request_Preprepare:
		return rep.hasRequestPreprepare(REQ)
	case *pb.Request_Prepare:
		return rep.hasRequestPrepare(REQ)
	case *pb.Request_Commit:
		return rep.hasRequestCommit(REQ)
	//case *pb.Request_Viewchange:
	//	return rep.hasRequestViewChange(REQ)
	//case *pb.Request_Ack:
	//	return rep.hasRequestAck(REQ)
	//case *pb.Request_Newview:
	//	return rep.hasRequestNewView(REQ)
	default:
		return false
	}
}

func (rep *Replica) hasRequestPreprepare(REQ *pb.Request) bool {
	view := REQ.GetPreprepare().View
	sequence := REQ.GetPreprepare().Sequence
	digest := REQ.GetPreprepare().Digest
	for _, req := range rep.requests["pre-prepare"] {
		v := req.GetPreprepare().View
		s := req.GetPreprepare().Sequence
		d := req.GetPreprepare().Digest
		if v == view && s == sequence && EQ(d, digest) {
			return true
		}
	}
	return false
}

func (rep *Replica) hasRequestPrepare(REQ *pb.Request) bool {
	view := REQ.GetPrepare().View
	sequence := REQ.GetPrepare().Sequence
	digest := REQ.GetPrepare().Digest
	replica := REQ.GetPrepare().Replica
	for _, req := range rep.requests["prepare"] {
		v := req.GetPrepare().View
		s := req.GetPrepare().Sequence
		d := req.GetPrepare().Digest
		r := req.GetPrepare().Replica
		if v == view && s == sequence && EQ(d, digest) && r == replica {
			return true
		}
	}
	return false
}

func (rep *Replica) hasRequestCommit(REQ *pb.Request) bool {
	view := REQ.GetCommit().View
	sequence := REQ.GetCommit().Sequence
	replica := REQ.GetCommit().Replica
	for _, req := range rep.requests["commit"] {
		v := req.GetCommit().View
		s := req.GetCommit().Sequence
		r := req.GetCommit().Replica
		if v == view && s == sequence && r == replica {
			return true
		}
	}
	return false
}

//func (rep *Replica) hasRequestViewChange(REQ *pb.Request) bool {
//	view := REQ.GetViewchange().View
//	replica := REQ.GetViewchange().Replica
//	for _, req := range rep.requests["view-change"] {
//		v := req.GetViewchange().View
//		r := req.GetViewchange().Replica
//		if v == view && r == replica {
//			return true
//		}
//	}
//	return false
//}

//func (rep *Replica) hasRequestAck(REQ *pb.Request) bool {
//	view := REQ.GetAck().View
//	replica := REQ.GetAck().Replica
//	viewchanger := REQ.GetAck().Viewchanger
//	for _, req := range rep.requests["ack"] {
//		v := req.GetAck().View
//		r := req.GetAck().Replica
//		vc := req.GetAck().Viewchanger
//		if v == view && r == replica && vc == viewchanger {
//			return true
//		}
//	}
//	return false
//}

//func (rep *Replica) hasRequestNewView(REQ *pb.Request) bool {
//	view := REQ.GetNewview().View
//	for _, req := range rep.requests["new-view"] {
//		v := req.GetNewview().View
//		if v == view {
//			return true
//		}
//	}
//	return false
//}

// Clear requests

//func (rep *Replica) clearRequestsBySeq(sequence uint32) {
//	rep.clearRequestClients()
//	rep.clearRequestPrepreparesBySeq(sequence)
//	rep.clearRequestPreparesBySeq(sequence)
//	rep.clearRequestCommitsBySeq(sequence)
//	//rep.clearRequestCheckpointsBySeq(sequence)
//}

//func (rep *Replica) clearRequestClients() {
//	clientReqs := rep.requests["client"]
//	lastTimestamp := rep.theLastReply().Timestamp
//	for idx, req := range rep.requests["client"] {
//		timestamp := req.GetClient().Timestamp
//		if lastTimestamp >= timestamp && idx < (len(clientReqs)-1) {
//			clientReqs = append(clientReqs[:idx], clientReqs[idx+1:]...)
//		} else {
//			clientReqs = clientReqs[:idx-1]
//		}
//	}
//	rep.requests["client"] = clientReqs
//}
//
//func (rep *Replica) clearRequestPrepreparesBySeq(sequence uint32) {
//	prePrepares := rep.requests["pre-prepare"]
//	for idx, req := range rep.requests["pre-prepare"] {
//		s := req.GetPreprepare().Sequence
//		if s <= sequence && idx < (len(prePrepares)-1) {
//			prePrepares = append(prePrepares[:idx], prePrepares[idx+1:]...)
//		} else {
//			prePrepares = prePrepares[:idx-1]
//		}
//	}
//	rep.requests["pre-prepare"] = prePrepares
//}
//
//func (rep *Replica) clearRequestPreparesBySeq(sequence uint32) {
//	prepares := rep.requests["prepare"]
//	for idx, req := range rep.requests["prepare"] {
//		s := req.GetPrepare().Sequence
//		if s <= sequence && idx < (len(prepares)-1) {
//			prepares = append(prepares[:idx], prepares[idx+1:]...)
//		} else {
//			prepares = prepares[:idx-1]
//		}
//	}
//	rep.requests["prepare"] = prepares
//}
//
//func (rep *Replica) clearRequestCommitsBySeq(sequence uint32) {
//	commits := rep.requests["commit"]
//	for idx, req := range rep.requests["commit"] {
//		s := req.GetCommit().Sequence
//		if s <= sequence && idx < (len(commits)-1) {
//			commits = append(commits[:idx], commits[idx+1:]...)
//		} else {
//			commits = commits[:idx-1]
//		}
//	}
//	rep.requests["commit"] = commits
//}
//
//func (rep *Replica) clearRequestCheckpointsBySeq(sequence uint32) {
//	checkpoints := rep.requests["checkpoint"]
//	for idx, req := range rep.requests["checkpoint"] {
//		s := req.GetCheckpoint().Sequence
//		if s <= sequence && idx < (len(checkpoints)-1) {
//			checkpoints = append(checkpoints[:idx], checkpoints[idx+1:]...)
//		} else {
//			checkpoints = checkpoints[:idx-1]
//		}
//	}
//	rep.requests["checkpoint"] = checkpoints
//}
//
//func (rep *Replica) clearRequestsByView(view uint32) {
//	rep.clearRequestPrepreparesByView(view)
//	rep.clearRequestPreparesByView(view)
//	rep.clearRequestCommitsByView(view)
//	//add others
//}
//
//func (rep *Replica) clearRequestPrepreparesByView(view uint32) {
//	prePrepares := rep.requests["pre-prepare"]
//	for idx, req := range rep.requests["pre-prepare"] {
//		v := req.GetPreprepare().View
//		if v < view {
//			prePrepares = append(prePrepares[:idx], prePrepares[idx+1:]...)
//		}
//	}
//	rep.requests["pre-prepare"] = prePrepares
//}
//
//func (rep *Replica) clearRequestPreparesByView(view uint32) {
//	prepares := rep.requests["prepare"]
//	for idx, req := range rep.requests["prepare"] {
//		v := req.GetPrepare().View
//		if v < view {
//			prepares = append(prepares[:idx], prepares[idx+1:]...)
//		}
//	}
//	rep.requests["prepare"] = prepares
//}
//
//func (rep *Replica) clearRequestCommitsByView(view uint32) {
//	commits := rep.requests["commit"]
//	for idx, req := range rep.requests["commit"] {
//		v := req.GetCommit().View
//		if v < view {
//			commits = append(commits[:idx], commits[idx+1:]...)
//		}
//	}
//	rep.requests["commit"] = commits
//}

// Handle requests

func (rep *Replica) handleRequest(REQ *pb.Request) {

	switch REQ.Value.(type) {
	case *pb.Request_Client:

		rep.handleRequestClient(REQ)

	case *pb.Request_Preprepare:

		rep.handleRequestPreprepare(REQ)

	case *pb.Request_Prepare:

		rep.handleRequestPrepare(REQ)

	case *pb.Request_Commit:

		rep.handleRequestCommit(REQ)

	//case *pb.Request_Checkpoint:
	//
	//	rep.handleRequestCheckpoint(REQ)
	//
	//case *pb.Request_Viewchange:
	//
	//	rep.handleRequestViewChange(REQ)
	//
	//case *pb.Request_Ack:
	//
	//	rep.handleRequestAck(REQ)

	default:
		plog.Info("Replica %d received unrecognized request type\n", rep.ID)
	}
}

func (rep *Replica) handleRequestClient(REQ *pb.Request) {
	rep.sequence++
	client := REQ.GetClient().Client
	timestamp := REQ.GetClient().Timestamp
	lastReplyToClient := rep.lastReplyToClient(client)
	if lastReplyToClient != nil {
		if lastReplyToClient.Timestamp == timestamp {
			reply := ToReply(rep.view, timestamp, client, rep.ID, lastReplyToClient.Result)
			rep.logReply(client, reply)
			go func() {
				rep.replyChan <- reply
			}()
			return
		}
	}
	rep.logRequest(REQ)
	if !rep.isPrimary(rep.ID) {
		return
	}
	req := ToRequestPreprepare(rep.view, rep.sequence, ReqDigest(REQ), rep.ID)
	rep.logRequest(req)
	plog.Info("Client-request done")
	go func() {
		rep.requestChan <- req
	}()
}

func (rep *Replica) handleRequestPreprepare(REQ *pb.Request) {
	replica := REQ.GetPreprepare().Replica
	if !rep.isPrimary(replica) {
		return
	}
	view := REQ.GetPreprepare().View
	if rep.view != view {
		return
	}
	sequence := REQ.GetPreprepare().Sequence
	if !rep.sequenceInRange(sequence) {
		return
	}
	digest := REQ.GetPreprepare().Digest
	accept := true
	for _, req := range rep.requests["pre-prepare"] {
		v := req.GetPreprepare().View
		s := req.GetPreprepare().Sequence
		d := req.GetPreprepare().Digest
		if v == view && s == sequence && !EQ(d, digest) {
			accept = false
			break
		}
	}
	if !accept {
		return
	}
	rep.logRequest(REQ)
	plog.Info("pre-prepare done")
	req := ToRequestPrepare(view, sequence, digest, rep.ID)
	if rep.hasRequest(req) {
		return
	}
	rep.logRequest(req)
	go func() {
		rep.requestChan <- req
	}()
}

func (rep *Replica) handleRequestPrepare(REQ *pb.Request) {

	replica := REQ.GetPrepare().Replica

	//if rep.isPrimary(replica) {
	//return
	//}

	view := REQ.GetPrepare().View

	if rep.view != view {
		return
	}

	sequence := REQ.GetPrepare().Sequence

	if !rep.sequenceInRange(sequence) {
		return
	}

	digest := REQ.GetPrepare().Digest

	rep.logRequest(REQ)

	//prePrepared := make(chan bool, 1)
	twoThirds := make(chan bool, 1)

	//go func() {
	//prePrepared <- rep.hasRequest(REQ)
	//}()

	go func() {
		count := 0
		for _, req := range rep.requests["prepare"] {
			v := req.GetPrepare().View
			s := req.GetPrepare().Sequence
			d := req.GetPrepare().Digest
			r := req.GetPrepare().Replica
			if v != view || s != sequence || !EQ(d, digest) {
				continue
			}
			if r == replica {
				plog.Debug("multiple prepare requests", "Replica ", replica, " sent multiple prepare requests")
				//continue
			}
			count++
			if rep.overTwoThirds(count) {
				twoThirds <- true
				return
			}
			twoThirds <- false
		}
	}()

	if !<-twoThirds {
		return
	}

	req := ToRequestCommit(view, sequence, rep.ID)
	if rep.hasRequest(req) {
		return
	}

	rep.logRequest(req)
	plog.Info("prepare done")
	go func() {
		rep.requestChan <- req
	}()
}

func (rep *Replica) handleRequestCommit(REQ *pb.Request) {

	view := REQ.GetCommit().View

	if rep.view != view {
		return
	}

	sequence := REQ.GetCommit().Sequence

	if !rep.sequenceInRange(sequence) {
		return
	}

	replica := REQ.GetCommit().Replica

	rep.logRequest(REQ)
	twoThirds := make(chan bool, 1)
	go func() {
		count := 0
		for _, req := range rep.requests["commit"] {
			v := req.GetCommit().View
			s := req.GetCommit().Sequence
			rr := req.GetCommit().Replica
			if v != view || s != sequence {
				continue
			}
			if rr == replica {
				plog.Debug("multiple commit requests", "Replica ", replica, " sent multiple commit requests")
				//continue
			}
			count++
			if rep.overTwoThirds(count) {
				twoThirds <- true
				return
			}
		}
		twoThirds <- false
	}()
	if !<-twoThirds {
		return
	}
	//log.Println(count)
	//log.Println("overtwothirds done")
	var digest []byte
	digests := make(map[string]struct{})
	for _, req := range rep.requests["pre-prepare"] {
		v := req.GetPreprepare().View
		s := req.GetPreprepare().Sequence
		if v == view && s <= sequence {
			digests[string(req.GetPreprepare().Digest)] = struct{}{}
			//log.Println(string(req.Digest()))
			if s == sequence {
				//log.Println("break done ")
				digest = req.GetPreprepare().Digest
				break
			}
		}
	}
	for _, req := range rep.requests["client"] {
		d := ReqDigest(req)

		if _, exists := digests[string(d)]; !exists {

			continue
		}
		if !EQ(d, digest) {

			continue
		}

		op := req.GetClient().Op
		timestamp := req.GetClient().Timestamp
		client := req.GetClient().Client
		result := &pb.Result{Value: op.Value}

		rep.executed = append(rep.executed, sequence)
		reply := ToReply(view, timestamp, client, rep.ID, result)

		rep.logReply(client, reply)
		plog.Info("commit done")
		rep.lastReply = reply

		go func() {
			rep.replyChan <- reply
		}()

		if !rep.isCheckpoint(sequence) {
			return
		}
		stateDigest := rep.stateDigest()
		req := ToRequestCheckpoint(sequence, stateDigest, rep.ID)
		rep.logRequest(req)

		go func() {
			rep.requestChan <- req
		}()
		return
	}

}

//func (rep *Replica) handleRequestCheckpoint(REQ *pb.Request) {
//
//	sequence := REQ.GetCheckpoint().Sequence
//
//	if !rep.sequenceInRange(sequence) {
//		return
//	}
//
//	digest := REQ.GetCheckpoint().Digest
//	replica := REQ.GetCheckpoint().Replica
//
//	count := 0
//	for _, req := range rep.requests["checkpoint"] {
//		s := req.GetCheckpoint().Sequence
//		d := req.GetCheckpoint().Digest
//		r := req.GetCheckpoint().Replica
//		if s != sequence || !EQ(d, digest) {
//			continue
//		}
//		if r == replica {
//			plog.Info("Replica %d sent multiple checkpoint requests\n", replica)
//			//continue
//		}
//		count++
//		if !rep.overTwoThirds(count) {
//			continue
//		}
//		// rep.clearEntries(sequence)
//		rep.clearRequestsBySeq(sequence)
//		checkpoint := ToCheckpoint(sequence, digest)
//		rep.addCheckpoint(checkpoint)
//		plog.Info("checkpoint and clear request done")
//		return
//	}
//
//}

//func (rep *Replica) handleRequestViewChange(REQ *pb.Request) {
//
//	view := REQ.GetViewchange().View
//
//	if view < rep.view {
//		return
//	}
//
//	reqViewChange := REQ.GetViewchange()
//
//	for _, prep := range reqViewChange.GetPreps() {
//		v := prep.View
//		s := prep.Sequence
//		if v >= view || !rep.sequenceInRange(s) {
//			return
//		}
//	}
//
//	for _, prePrep := range reqViewChange.GetPrepreps() {
//		v := prePrep.View
//		s := prePrep.Sequence
//		if v >= view || !rep.sequenceInRange(s) {
//			return
//		}
//	}
//
//	for _, checkpoint := range reqViewChange.GetCheckpoints() {
//		s := checkpoint.Sequence
//		if !rep.sequenceInRange(s) {
//			return
//		}
//	}
//
//	if rep.hasRequest(REQ) {
//		return
//	}
//
//	rep.logRequest(REQ)
//
//	viewchanger := reqViewChange.Replica
//
//	req := ToRequestAck(
//		view,
//		rep.ID,
//		viewchanger,
//		ReqDigest(REQ))
//
//	go func() {
//		rep.requestChan <- req
//	}()
//}

//func (rep *Replica) handleRequestAck(REQ *pb.Request) {
//
//	view := REQ.GetAck().View
//	primaryID := rep.newPrimary(view)
//
//	if rep.ID != primaryID {
//		return
//	}
//
//	if rep.hasRequest(REQ) {
//		return
//	}
//
//	rep.logRequest(REQ)
//
//	replica := REQ.GetAck().Replica
//	viewchanger := REQ.GetAck().Viewchanger
//	digest := REQ.GetAck().Digest
//
//	reqViewChange := make(chan *pb.Request, 1)
//	twoThirds := make(chan bool, 1)
//
//	go func() {
//		for _, req := range rep.requests["view-change"] {
//			v := req.GetViewchange().View
//			vc := req.GetViewchange().Replica
//			if v == view && vc == viewchanger {
//				reqViewChange <- req
//			}
//		}
//		reqViewChange <- nil
//	}()
//
//	go func() {
//		count := 0
//		for _, req := range rep.requests["ack"] {
//			v := req.GetAck().View
//			r := req.GetAck().Replica
//			vc := req.GetAck().Viewchanger
//			d := req.GetAck().Digest
//			if v != view || vc != viewchanger || !EQ(d, digest) {
//				continue
//			}
//			if r == replica {
//				plog.Info("Replica %d sent multiple ack requests\n", replica)
//				continue
//			}
//			count++
//			if rep.twoThirds(count) {
//				twoThirds <- true
//				return
//			}
//		}
//		twoThirds <- false
//	}()
//
//	req := <-reqViewChange
//
//	if req == nil || !<-twoThirds {
//		return
//	}
//
//	rep.logPendingVC(req)
//
//	// When to send new view?
//	rep.requestNewView(view)
//}

//func (rep *Replica) handleRequestNewView(REQ *pb.Request) {
//
//	view := REQ.GetNewview().View
//
//	if view == 0 || view < rep.view {
//		return
//	}
//
//	replica := REQ.GetNewview().Replica
//	primary := rep.newPrimary(view)
//
//	if replica != primary {
//		return
//	}
//
//	if rep.hasRequest(REQ) {
//		return
//	}
//
//	rep.logRequest(REQ)
//
//	rep.processNewView(REQ)
//}

//func (rep *Replica) correctViewChanges(viewChanges []*pb.ViewChange) (requests []*pb.Request) {
//
//	// Returns requests if correct, else returns nil
//
//	valid := false
//	for _, vc := range viewChanges {
//		for _, req := range rep.requests["view-change"] {
//			d := ReqDigest(req)
//			if !EQ(d, vc.Digest) {
//				continue
//			}
//			requests = append(requests, req)
//			v := req.GetViewchange().View
//			// VIEW or rep.view??
//			if v == rep.view {
//				valid = true
//				break
//			}
//		}
//		if !valid {
//			return nil
//		}
//	}
//
//	if rep.isPrimary(rep.ID) {
//		reps := make(map[uint32]int)
//		valid = false
//		for _, req := range rep.requests["ack"] {
//			reqAck := req.GetAck()
//			reps[reqAck.Replica]++
//			if rep.twoThirds(reps[reqAck.Replica]) { //-2
//				valid = true
//				break
//			}
//		}
//		if !valid {
//			return nil
//		}
//	}
//
//	return
//}

//func (rep *Replica) correctSummaries(requests []*pb.Request, summaries []*pb.Summary) (correct bool) {
//
//	// Verify SUMMARIES
//
//	var start uint32
//	var digest []byte
//	digests := make(map[uint32][]byte)
//
//	for _, summary := range summaries {
//		s := summary.Sequence
//		d := summary.Digest
//		if _d, ok := digests[s]; ok && !EQ(_d, d) {
//			return
//		} else if !ok {
//			digests[s] = d
//		}
//		if s < start || start == uint32(0) {
//			start = s
//			digest = d
//		}
//	}
//
//	var A1 []*pb.Request
//	var A2 []*pb.Request
//
//	valid := false
//	for _, req := range requests {
//		reqViewChange := req.GetViewchange()
//		s := reqViewChange.Sequence
//		if s <= start {
//			A1 = append(A1, req)
//		}
//		checkpoints := reqViewChange.GetCheckpoints()
//		for _, checkpoint := range checkpoints {
//			if checkpoint.Sequence == start && EQ(checkpoint.Digest, digest) {
//				A2 = append(A2, req)
//				break
//			}
//		}
//		if rep.twoThirds(len(A1)) && rep.oneThird(len(A2)) {
//			valid = true
//			break
//		}
//	}
//
//	if !valid {
//		return
//	}
//
//	end := start + CheckPointPeriod*ConstantFactor
//
//	for seq := start; seq <= end; seq++ {
//
//		valid = false
//
//		for _, summary := range summaries {
//
//			if summary.Sequence != seq {
//				continue
//			}
//
//			if summary.Digest != nil {
//
//				var view uint32
//
//				for _, req := range requests {
//					reqViewChange := req.GetViewchange()
//					preps := reqViewChange.GetPreps()
//					for _, prep := range preps {
//						s := prep.Sequence
//						d := prep.Digest
//						if s != summary.Sequence || !EQ(d, summary.Digest) {
//							continue
//						}
//						v := prep.View
//						if v > view {
//							view = v
//						}
//					}
//				}
//
//				verifiedA1 := make(chan bool, 1)
//
//				// Verify A1
//				go func() {
//
//					var A1 []*pb.Request
//
//				FOR_LOOP:
//					for _, req := range requests {
//						reqViewChange := req.GetViewchange()
//						s := reqViewChange.Sequence
//						if s >= summary.Sequence {
//							continue
//						}
//						preps := reqViewChange.GetPreps()
//						for _, prep := range preps {
//							s = prep.Sequence
//							if s != summary.Sequence {
//								continue
//							}
//							d := prep.Digest
//							v := prep.View
//							if v > view || (v == view && !EQ(d, summary.Digest)) {
//								continue FOR_LOOP
//							}
//						}
//						A1 = append(A1, req)
//						if rep.twoThirds(len(A1)) {
//							verifiedA1 <- true
//							return
//						}
//					}
//					verifiedA1 <- false
//				}()
//
//				verifiedA2 := make(chan bool, 1)
//
//				// Verify A2
//				go func() {
//
//					var A2 []*pb.Request
//
//					for _, req := range requests {
//						reqViewChange := req.GetViewchange()
//						prePreps := reqViewChange.GetPrepreps()
//						for _, prePrep := range prePreps {
//							s := prePrep.Sequence
//							d := prePrep.Digest
//							v := prePrep.View
//							if s == summary.Sequence && EQ(d, summary.Digest) && v >= view {
//								A2 = append(A2, req)
//								break
//							}
//						}
//						if rep.oneThird(len(A2)) {
//							verifiedA2 <- true
//							return
//						}
//					}
//					verifiedA2 <- false
//				}()
//
//				if !<-verifiedA1 || !<-verifiedA2 {
//					continue
//				}
//
//				valid = true
//				break
//
//			} else {
//
//				var A1 []*pb.Request
//
//			FOR_LOOP:
//
//				for _, req := range requests {
//
//					reqViewChange := req.GetViewchange()
//
//					s := reqViewChange.Sequence
//
//					if s >= summary.Sequence {
//						continue
//					}
//
//					preps := reqViewChange.GetPreps()
//					for _, prep := range preps {
//						if prep.Sequence == summary.Sequence {
//							continue FOR_LOOP
//						}
//					}
//
//					A1 = append(A1, req)
//					if rep.twoThirds(len(A1)) {
//						valid = true
//						break
//					}
//				}
//				if valid {
//					break
//				}
//			}
//		}
//		if !valid {
//			return
//		}
//	}
//
//	return true
//}

//func (rep *Replica) processNewView(REQ *pb.Request) (success bool) {
//
//	if rep.activeView {
//		return
//	}
//
//	reqNewView := REQ.GetNewview()
//
//	viewChanges := reqNewView.GetViewchanges()
//	requests := rep.correctViewChanges(viewChanges)
//
//	if requests == nil {
//		return
//	}
//
//	summaries := reqNewView.GetSummaries()
//	correct := rep.correctSummaries(requests, summaries)
//
//	if !correct {
//		return
//	}
//
//	var h uint32
//	for _, checkpoint := range rep.checkpoints {
//		if checkpoint.Sequence < h || h == uint32(0) {
//			h = checkpoint.Sequence
//		}
//	}
//
//	var s uint32
//	for _, summary := range summaries {
//		if summary.Sequence < s || s == uint32(0) {
//			s = summary.Sequence
//		}
//		if summary.Sequence > h {
//			valid := false
//			for _, req := range rep.requests["view-change"] { //in
//				if EQ(ReqDigest(req), summary.Digest) {
//					valid = true
//					break
//				}
//			}
//			if !valid {
//				return
//			}
//		}
//	}
//
//	if h < s {
//		return
//	}
//
//	// Process new view
//	rep.activeView = true
//
//	for _, summary := range summaries {
//
//		if rep.ID != reqNewView.Replica {
//			req := ToRequestPrepare(
//				reqNewView.View,
//				summary.Sequence,
//				summary.Digest,
//				rep.ID) // the backup sends/logs prepare
//
//			go func() {
//				if !rep.hasRequest(req) {
//					rep.requestChan <- req
//				}
//			}()
//			if summary.Sequence <= h {
//				continue
//			}
//
//			if !rep.hasRequest(req) {
//				rep.logRequest(req)
//			}
//		} else {
//			if summary.Sequence <= h {
//				break
//			}
//		}
//
//		req := ToRequestPreprepare(
//			reqNewView.View,
//			summary.Sequence,
//			summary.Digest,
//			reqNewView.Replica) // new primary pre-prepares
//
//		if !rep.hasRequest(req) {
//			rep.logRequest(req)
//		}
//	}
//
//	var maxSequence uint32
//	for _, req := range rep.requests["pre-prepare"] {
//		reqPrePrepare := req.GetPreprepare()
//		if reqPrePrepare.Sequence > maxSequence {
//			maxSequence = reqPrePrepare.Sequence
//		}
//	}
//	rep.sequence = maxSequence
//	return true
//}

//func (rep *Replica) prePrepBySequence(sequence uint32) []*pb.Entry {
//	var view uint32
//	var requests []*pb.Request
//	for _, req := range rep.requests["pre-prepare"] {
//		v := req.GetPreprepare().View
//		s := req.GetPreprepare().Sequence
//		if v >= view && s == sequence {
//			view = v
//			requests = append(requests, req)
//		}
//	}
//	if requests == nil {
//		return nil
//	}
//	var prePreps []*pb.Entry
//	for _, req := range requests {
//		v := req.GetPreprepare().View
//		if v == view {
//			s := req.GetPreprepare().Sequence
//			d := req.GetPreprepare().Digest
//			prePrep := ToEntry(s, d, v)
//			prePreps = append(prePreps, prePrep)
//		}
//	}
//FOR_LOOP:
//	for _, prePrep := range prePreps {
//		for _, req := range rep.allRequests() { //TODO: optimize
//			if EQ(ReqDigest(req), prePrep.Digest) {
//				continue FOR_LOOP
//			}
//		}
//		return nil
//	}
//	return prePreps
//}
//
//func (rep *Replica) prepBySequence(sequence uint32) ([]*pb.Entry, []*pb.Entry) {
//
//	prePreps := rep.prePrepBySequence(sequence)
//
//	if prePreps == nil {
//		return nil, nil
//	}
//
//	var preps []*pb.Entry
//
//FOR_LOOP:
//	for _, prePrep := range prePreps {
//
//		view := prePrep.View
//		digest := prePrep.Digest
//
//		replicas := make(map[uint32]int)
//		for _, req := range rep.requests["prepare"] {
//			reqPrepare := req.GetPrepare()
//			v := reqPrepare.View
//			s := reqPrepare.Sequence
//			d := reqPrepare.Digest
//			if v == view && s == sequence && EQ(d, digest) {
//				r := reqPrepare.Replica
//				replicas[r]++
//				if rep.twoThirds(replicas[r]) {
//					prep := ToEntry(s, d, v)
//					preps = append(preps, prep)
//					continue FOR_LOOP
//				}
//			}
//		}
//		return prePreps, nil
//	}
//
//	return prePreps, preps
//}

/*
func (rep *Replica) prepBySequence(sequence uint32) *pb.Entry {
	var REQ *pb.Request
	var view uint32
	// Looking for a commit that
	// replica sent with sequence#
	for _, req := range rep.requests["commit"] {
		v := req.GetCommit().View
		s := req.GetCommit().Sequence
		r := req.GetCommit().Replica
		if v >= view && s == sequence && r == rep.ID {
			REQ = req
			view = v
		}
	}
	if REQ == nil {
		return nil
	}
	entry := pb.ToEntry(sequence, nil, view)
	return entry
}

func (rep *Replica) prePrepBySequence(sequence uint32) *pb.Entry {
	var REQ *pb.Request
	var view uint32
	// Looking for prepare/pre-prepare that
	// replica sent with matching sequence
	for _, req := range rep.requests["pre-prepare"] {
		v := req.GetPreprepare().View
		s := req.GetPreprepare().Sequence
		r := req.GetPreprepare().Replica
		if v >= view && s == sequence && r == rep.ID {
			REQ = req
			view = v
		}
	}
	for _, req := range rep.requests["prepare"] {
		v := req.GetPreprepare().View
		s := req.GetPreprepare().Sequence
		r := req.GetPreprepare().Replica
		if v >= view && s == sequence && r == rep.ID {
			REQ = req
			view = v
		}
	}
	if REQ == nil {
		return nil
	}
	var digest []byte
	switch REQ.Value.(type) {
	case *pb.Request_Preprepare:
		digest = REQ.GetPreprepare().Digest
	case *pb.Request_Prepare:
		digest = REQ.GetPrepare().Digest
	}
	entry := pb.ToEntry(sequence, digest, view)
	return entry
}
*/

//func (rep *Replica) requestViewChange(view uint32) {
//
//	if view != rep.view+1 {
//		return
//	}
//	rep.view = view
//	rep.activeView = false
//
//	var prePreps []*pb.Entry
//	var preps []*pb.Entry
//
//	start := rep.lowWaterMark() + 1
//	end := rep.highWaterMark()
//
//	for s := start; s <= end; s++ {
//		_prePreps, _preps := rep.prepBySequence(s)
//		if _prePreps != nil {
//			prePreps = append(prePreps, _prePreps...)
//		}
//		if _preps != nil {
//			preps = append(preps, _preps...)
//		}
//	}
//
//	sequence := rep.lowWaterMark()
//
//	req := ToRequestViewChange(
//		view,
//		sequence,
//		rep.checkpoints, //
//		preps,
//		prePreps,
//		rep.ID)
//
//	rep.logRequest(req)
//
//	go func() {
//		rep.requestChan <- req
//	}()
//
//	rep.clearRequestsByView(view)
//}
//
//func (rep *Replica) createNewView(view uint32) (request *pb.Request) {
//
//	// Returns RequestNewView if successful, else returns nil
//	// create viewChanges
//	viewChanges := make([]*pb.ViewChange, len(rep.pendingVC))
//
//	for idx := range viewChanges {
//		req := rep.pendingVC[idx]
//		viewchanger := req.GetViewchange().Replica
//		vc := ToViewChange(viewchanger, ReqDigest(req))
//		viewChanges[idx] = vc
//	}
//
//	var summaries []*pb.Summary
//	var summary *pb.Summary
//
//	start := rep.lowWaterMark() + 1
//	end := rep.highWaterMark()
//
//	// select starting checkpoint
//FOR_LOOP_1:
//	for seq := start; seq <= end; seq++ {
//
//		overLWM := 0
//		var digest []byte
//		digests := make(map[string]int)
//
//		for _, req := range rep.pendingVC {
//			reqViewChange := req.GetViewchange()
//			if reqViewChange.Sequence <= seq {
//				overLWM++
//			}
//			for _, checkpoint := range reqViewChange.GetCheckpoints() {
//				if checkpoint.Sequence == seq {
//					d := checkpoint.Digest
//					digests[string(d)]++
//					if rep.oneThird(digests[string(d)]) {
//						digest = d
//						break
//					}
//				}
//			}
//			if rep.twoThirds(overLWM) && rep.oneThird(digests[string(digest)]) {
//				summary = ToSummary(seq, digest)
//				continue FOR_LOOP_1
//			}
//		}
//	}
//
//	if summary == nil {
//		return
//	}
//
//	summaries = append(summaries, summary)
//
//	start = summary.Sequence
//	end = start + CheckPointPeriod*ConstantFactor
//
//	// select summaries
//	// TODO: optimize
//FOR_LOOP_2:
//	for seq := start; seq <= end; seq++ {
//
//		for _, REQ := range rep.pendingVC {
//
//			sequence := REQ.GetViewchange().Sequence
//
//			if sequence != seq {
//				continue
//			}
//
//			var A1 []*pb.Request
//			var A2 []*pb.Request
//
//			view := REQ.GetViewchange().View
//			digest := ReqDigest(REQ)
//
//		FOR_LOOP_3:
//			for _, req := range rep.pendingVC {
//
//				reqViewChange := req.GetViewchange()
//
//				if reqViewChange.Sequence < sequence {
//					preps := reqViewChange.GetPreps()
//					for _, prep := range preps {
//						if prep.Sequence != sequence {
//							continue
//						}
//						if prep.View > view || (prep.View == view && !EQ(prep.Digest, digest)) {
//							continue FOR_LOOP_3
//						}
//					}
//					A1 = append(A1, req)
//				}
//				prePreps := reqViewChange.GetPrepreps()
//				for _, prePrep := range prePreps {
//					if prePrep.Sequence != sequence {
//						continue
//					}
//					if prePrep.View >= view && EQ(prePrep.Digest, digest) {
//						A2 = append(A2, req)
//						continue FOR_LOOP_3
//					}
//				}
//			}
//
//			if rep.twoThirds(len(A1)) && rep.oneThird(len(A2)) {
//				summary = ToSummary(sequence, digest)
//				summaries = append(summaries, summary)
//				continue FOR_LOOP_2
//			}
//		}
//	}
//
//	request = ToRequestNewView(view, viewChanges, summaries, rep.ID)
//	return
//}
//
//func (rep *Replica) requestNewView(view uint32) {
//
//	req := rep.createNewView(view)
//
//	if req == nil || rep.hasRequest(req) {
//		return
//	}
//
//	// Process new view
//
//	success := rep.processNewView(req)
//
//	if !success {
//		return
//	}
//
//	rep.logRequest(req)
//
//	go func() {
//		rep.requestChan <- req
//	}()
//
//}
